// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rules

import (
	"context"
	"errors"
	"fmt"
	"log/slog"
	"net/url"
	"slices"
	"strings"
	"sync"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/common/promslog"
	"golang.org/x/sync/semaphore"

	"github.com/prometheus/prometheus/model/labels"
	"github.com/prometheus/prometheus/model/rulefmt"
	"github.com/prometheus/prometheus/notifier"
	"github.com/prometheus/prometheus/promql"
	"github.com/prometheus/prometheus/promql/parser"
	"github.com/prometheus/prometheus/storage"
	"github.com/prometheus/prometheus/util/strutil"
)

// QueryFunc processes PromQL queries.
type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector, error)

// EngineQueryFunc returns a new query function that executes instant queries against
// the given engine.
// It converts scalar into vector results.
func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable) QueryFunc {
	return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
		q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
		if err != nil {
			return nil, err
		}
		res := q.Exec(ctx)
		if res.Err != nil {
			return nil, res.Err
		}
		switch v := res.Value.(type) {
		case promql.Vector:
			return v, nil
		case promql.Scalar:
			return promql.Vector{promql.Sample{
				T:      v.T,
				F:      v.V,
				Metric: labels.Labels{},
			}}, nil
		default:
			return nil, errors.New("rule result is not a vector or scalar")
		}
	}
}

// DefaultEvalIterationFunc is the default implementation of
// GroupEvalIterationFunc that is periodically invoked to evaluate the rules
// in a group at a given point in time and updates Group state and metrics
// accordingly. Custom GroupEvalIterationFunc implementations are recommended
// to invoke this function as well, to ensure correct Group state and metrics
// are maintained.
func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time.Time) {
	g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()

	start := time.Now()
	g.Eval(ctx, evalTimestamp)
	timeSinceStart := time.Since(start)

	g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
	g.setEvaluationTime(timeSinceStart)
	g.setLastEvaluation(start)
	g.setLastEvalTimestamp(evalTimestamp)
}

// The Manager manages recording and alerting rules.
type Manager struct {
	opts     *ManagerOptions
	groups   map[string]*Group
	mtx      sync.RWMutex
	block    chan struct{}
	done     chan struct{}
	restored bool

	logger *slog.Logger
}

// NotifyFunc sends notifications about a set of alerts generated by the given expression.
type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)

// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
	ExternalURL               *url.URL
	QueryFunc                 QueryFunc
	NotifyFunc                NotifyFunc
	Context                   context.Context
	Appendable                storage.Appendable
	Queryable                 storage.Queryable
	Logger                    *slog.Logger
	Registerer                prometheus.Registerer
	OutageTolerance           time.Duration
	ForGracePeriod            time.Duration
	ResendDelay               time.Duration
	GroupLoader               GroupLoader
	DefaultRuleQueryOffset    func() time.Duration
	MaxConcurrentEvals        int64
	ConcurrentEvalsEnabled    bool
	RuleConcurrencyController RuleConcurrencyController
	RuleDependencyController  RuleDependencyController

	Metrics *Metrics
}

// NewManager returns an implementation of Manager, ready to be started
// by calling the Run method.
func NewManager(o *ManagerOptions) *Manager {
	if o.Metrics == nil {
		o.Metrics = NewGroupMetrics(o.Registerer)
	}

	if o.GroupLoader == nil {
		o.GroupLoader = FileLoader{}
	}

	if o.RuleConcurrencyController == nil {
		if o.ConcurrentEvalsEnabled {
			o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals)
		} else {
			o.RuleConcurrencyController = sequentialRuleEvalController{}
		}
	}

	if o.RuleDependencyController == nil {
		o.RuleDependencyController = ruleDependencyController{}
	}

	if o.Logger == nil {
		o.Logger = promslog.NewNopLogger()
	}

	m := &Manager{
		groups: map[string]*Group{},
		opts:   o,
		block:  make(chan struct{}),
		done:   make(chan struct{}),
		logger: o.Logger,
	}

	return m
}

// Run starts processing of the rule manager. It is blocking.
func (m *Manager) Run() {
	m.logger.Info("Starting rule manager...")
	m.start()
	<-m.done
}

func (m *Manager) start() {
	close(m.block)
}

// Stop the rule manager's rule evaluation cycles.
func (m *Manager) Stop() {
	m.mtx.Lock()
	defer m.mtx.Unlock()

	m.logger.Info("Stopping rule manager...")

	for _, eg := range m.groups {
		eg.stop()
	}

	// Shut down the groups waiting multiple evaluation intervals to write
	// staleness markers.
	close(m.done)

	m.logger.Info("Rule manager stopped")
}

// Update the rule manager's state as the config requires. If
// loading the new rules failed the old rule set is restored.
// This method will no-op in case the manager is already stopped.
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc) error {
	m.mtx.Lock()
	defer m.mtx.Unlock()

	// We cannot update a stopped manager
	select {
	case <-m.done:
		return nil
	default:
	}

	groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...)

	if errs != nil {
		for _, e := range errs {
			m.logger.Error("loading groups failed", "err", e)
		}
		return errors.New("error loading rules, previous rule set restored")
	}
	m.restored = true

	var wg sync.WaitGroup
	for _, newg := range groups {
		// If there is an old group with the same identifier,
		// check if new group equals with the old group, if yes then skip it.
		// If not equals, stop it and wait for it to finish the current iteration.
		// Then copy it into the new group.
		gn := GroupKey(newg.file, newg.name)
		oldg, ok := m.groups[gn]
		delete(m.groups, gn)

		if ok && oldg.Equals(newg) {
			groups[gn] = oldg
			continue
		}

		wg.Add(1)
		go func(newg *Group) {
			if ok {
				oldg.stop()
				newg.CopyState(oldg)
			}
			wg.Done()
			// Wait with starting evaluation until the rule manager
			// is told to run. This is necessary to avoid running
			// queries against a bootstrapping storage.
			<-m.block
			newg.run(m.opts.Context)
		}(newg)
	}

	// Stop remaining old groups.
	wg.Add(len(m.groups))
	for n, oldg := range m.groups {
		go func(n string, g *Group) {
			g.markStale = true
			g.stop()
			if m := g.metrics; m != nil {
				m.IterationsMissed.DeleteLabelValues(n)
				m.IterationsScheduled.DeleteLabelValues(n)
				m.EvalTotal.DeleteLabelValues(n)
				m.EvalFailures.DeleteLabelValues(n)
				m.GroupInterval.DeleteLabelValues(n)
				m.GroupLastEvalTime.DeleteLabelValues(n)
				m.GroupLastDuration.DeleteLabelValues(n)
				m.GroupRules.DeleteLabelValues(n)
				m.GroupSamples.DeleteLabelValues((n))
			}
			wg.Done()
		}(n, oldg)
	}

	wg.Wait()
	m.groups = groups

	return nil
}

// GroupLoader is responsible for loading rule groups from arbitrary sources and parsing them.
type GroupLoader interface {
	Load(identifier string) (*rulefmt.RuleGroups, []error)
	Parse(query string) (parser.Expr, error)
}

// FileLoader is the default GroupLoader implementation. It defers to rulefmt.ParseFile
// and parser.ParseExpr.
type FileLoader struct{}

func (FileLoader) Load(identifier string) (*rulefmt.RuleGroups, []error) {
	return rulefmt.ParseFile(identifier)
}

func (FileLoader) Parse(query string) (parser.Expr, error) { return parser.ParseExpr(query) }

// LoadGroups reads groups from a list of files.
func (m *Manager) LoadGroups(
	interval time.Duration, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc, filenames ...string,
) (map[string]*Group, []error) {
	groups := make(map[string]*Group)

	shouldRestore := !m.restored

	for _, fn := range filenames {
		rgs, errs := m.opts.GroupLoader.Load(fn)
		if errs != nil {
			return nil, errs
		}

		for _, rg := range rgs.Groups {
			itv := interval
			if rg.Interval != 0 {
				itv = time.Duration(rg.Interval)
			}

			rules := make([]Rule, 0, len(rg.Rules))
			for _, r := range rg.Rules {
				expr, err := m.opts.GroupLoader.Parse(r.Expr.Value)
				if err != nil {
					return nil, []error{fmt.Errorf("%s: %w", fn, err)}
				}

				mLabels := FromMaps(rg.Labels, r.Labels)

				if r.Alert.Value != "" {
					rules = append(rules, NewAlertingRule(
						r.Alert.Value,
						expr,
						time.Duration(r.For),
						time.Duration(r.KeepFiringFor),
						mLabels,
						labels.FromMap(r.Annotations),
						externalLabels,
						externalURL,
						m.restored,
						m.logger.With("alert", r.Alert),
					))
					continue
				}
				rules = append(rules, NewRecordingRule(
					r.Record.Value,
					expr,
					mLabels,
				))
			}

			// Check dependencies between rules and store it on the Rule itself.
			m.opts.RuleDependencyController.AnalyseRules(rules)

			groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
				Name:              rg.Name,
				File:              fn,
				Interval:          itv,
				Limit:             rg.Limit,
				Rules:             rules,
				ShouldRestore:     shouldRestore,
				Opts:              m.opts,
				QueryOffset:       (*time.Duration)(rg.QueryOffset),
				done:              m.done,
				EvalIterationFunc: groupEvalIterationFunc,
			})
		}
	}

	return groups, nil
}

// RuleGroups returns the list of manager's rule groups.
func (m *Manager) RuleGroups() []*Group {
	m.mtx.RLock()
	defer m.mtx.RUnlock()

	rgs := make([]*Group, 0, len(m.groups))
	for _, g := range m.groups {
		rgs = append(rgs, g)
	}

	slices.SortFunc(rgs, func(a, b *Group) int {
		fileCompare := strings.Compare(a.file, b.file)

		// If its 0, then the file names are the same.
		// Lets look at the group names in that case.
		if fileCompare != 0 {
			return fileCompare
		}
		return strings.Compare(a.name, b.name)
	})

	return rgs
}

// Rules returns the list of the manager's rules.
func (m *Manager) Rules(matcherSets ...[]*labels.Matcher) []Rule {
	m.mtx.RLock()
	defer m.mtx.RUnlock()

	var rules []Rule
	for _, g := range m.groups {
		rules = append(rules, g.Rules(matcherSets...)...)
	}

	return rules
}

// AlertingRules returns the list of the manager's alerting rules.
func (m *Manager) AlertingRules() []*AlertingRule {
	alerts := []*AlertingRule{}
	for _, rule := range m.Rules() {
		if alertingRule, ok := rule.(*AlertingRule); ok {
			alerts = append(alerts, alertingRule)
		}
	}

	return alerts
}

type Sender interface {
	Send(alerts ...*notifier.Alert)
}

// SendAlerts implements the rules.NotifyFunc for a Notifier.
func SendAlerts(s Sender, externalURL string) NotifyFunc {
	return func(ctx context.Context, expr string, alerts ...*Alert) {
		var res []*notifier.Alert

		for _, alert := range alerts {
			a := &notifier.Alert{
				StartsAt:     alert.FiredAt,
				Labels:       alert.Labels,
				Annotations:  alert.Annotations,
				GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
			}
			if !alert.ResolvedAt.IsZero() {
				a.EndsAt = alert.ResolvedAt
			} else {
				a.EndsAt = alert.ValidUntil
			}
			res = append(res, a)
		}

		if len(alerts) > 0 {
			s.Send(res...)
		}
	}
}

// RuleDependencyController controls whether a set of rules have dependencies between each other.
type RuleDependencyController interface {
	// AnalyseRules analyses dependencies between the input rules. For each rule that it's guaranteed
	// not having any dependants and/or dependency, this function should call Rule.SetNoDependentRules(true)
	// and/or Rule.SetNoDependencyRules(true).
	AnalyseRules(rules []Rule)
}

type ruleDependencyController struct{}

// AnalyseRules implements RuleDependencyController.
func (c ruleDependencyController) AnalyseRules(rules []Rule) {
	depMap := buildDependencyMap(rules)
	for _, r := range rules {
		r.SetNoDependentRules(depMap.dependents(r) == 0)
		r.SetNoDependencyRules(depMap.dependencies(r) == 0)
	}
}

// RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently.
// Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus
// server with additional query load. Concurrency is controlled globally, not on a per-group basis.
type RuleConcurrencyController interface {
	// Allow determines if the given rule is allowed to be evaluated concurrently.
	// If Allow() returns true, then Done() must be called to release the acquired slot and corresponding cleanup is done.
	// It is important that both *Group and Rule are not retained and only be used for the duration of the call.
	Allow(ctx context.Context, group *Group, rule Rule) bool

	// Done releases a concurrent evaluation slot.
	Done(ctx context.Context)
}

// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
type concurrentRuleEvalController struct {
	sema *semaphore.Weighted
}

func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController {
	return &concurrentRuleEvalController{
		sema: semaphore.NewWeighted(maxConcurrency),
	}
}

func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule Rule) bool {
	// To allow a rule to be executed concurrently, we need 3 conditions:
	// 1. The rule must not have any rules that depend on it.
	// 2. The rule itself must not depend on any other rules.
	// 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot.
	if rule.NoDependentRules() && rule.NoDependencyRules() {
		return c.sema.TryAcquire(1)
	}

	return false
}

func (c *concurrentRuleEvalController) Done(_ context.Context) {
	c.sema.Release(1)
}

// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially.
type sequentialRuleEvalController struct{}

func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule) bool {
	return false
}

func (c sequentialRuleEvalController) Done(_ context.Context) {}

// FromMaps returns new sorted Labels from the given maps, overriding each other in order.
func FromMaps(maps ...map[string]string) labels.Labels {
	mLables := make(map[string]string)

	for _, m := range maps {
		for k, v := range m {
			mLables[k] = v
		}
	}

	return labels.FromMap(mLables)
}
